WorkflowsからDataformワークフローをポーリングしてみた
データアナリティクス事業本部の根本です。今日はWorkflowsからDataformワークフローを起動して、ポーリングしてみました。DataformワークフローをWorkflowsから起動しただけで終わるのだと物足りないなと思っていた方はぜひ読んでみてください。
この記事の対象者
- Dataformワークフローをポーリングしようと思っているひと
前提条件
- Dataform、WorkflowsAPIが使用できること
検証の全体像
- WorkflowsからDataformワークフローの実行状態を取得して、実行状態が完了になるまでポーリングする
検証用に実装するWorkflowsの処理の流れとしては、以下となります。
1. Dataformのソースをコンパイル
2. その結果を元にDataformワークフローを実行
3. Dataformワークフロー実行後は即時ワークフローの実行状態が返却される。その後実行状態を確認するAPIを数秒間隔で発行してDataformワークフローが実行完了するまでポーリング
イメージとしては以下となります。
それでは検証していきます!
やってみる
Dataform側の準備
まずはDataform側の準備を行います。Workflowsからのポーリングで実行中の状態を確認したいので10秒程度かかる処理を組みました。
以下の4つのSQLXファイルを作成しました。
- test_1.sqlx
- test_2.sqlx
- test_3.sqlx
- test_4.sqlx
各SQLXファイルの処理は単純なもので、まずtest_1
テーブルを列の値1
で作成して、それ以降は$ref
を用いて前のViewを参照して作成していくというものです。
test_1
とtest_2
のSQLXファイルだけ例に示します。
config { type: "view", } SELECT 1 AS test_1
config { type: "view", } SELECT * FROM ${ref("test_1")}
環境にもよりますが、試してみたら大体10秒前後の処理時間になっていたのでこのDataformワークフローを元にしてWorkflowsからポーリングしてみます。
Workflows側の準備
まずはWorkflowsのyamlを以下に示します。
main: steps: - init: assign: - repository: projects/プロジェクトID/locations/asia-northeast1/repositories/リポジトリ名 - createCompilationResult: call: http.post args: url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/compilationResults"} auth: type: OAuth2 body: gitCommitish: "main" result: compilationResult - createWorkflowInvocation: call: http.post args: url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/workflowInvocations"} auth: type: OAuth2 body: compilationResult: ${compilationResult.body.name} result: createWorkflowInvocationResult - checkDataform: call: http.get args: url: ${"https://dataform.googleapis.com/v1beta1/" + createWorkflowInvocationResult.body.name} auth: type: OAuth2 result: checkStatus - checkIfDone: switch: - condition: ${checkStatus.body.state == "SUCCEEDED"} return: ${"ポーリングを終了します。 STATE:" + checkStatus.body.state} - printLog: call: sys.log args: text: ${"ポーリングを開始します。STATE:" + checkStatus.body.state} severity: INFO - wait: call: sys.sleep args: seconds: 2 next: checkDataform
それではポーリング処理の箇所に関して説明していきます。
※コンパイル結果作成、ワークフロー呼び出しに関してはこちらの記事をご確認ください。
- checkDataform: call: http.get args: url: ${"https://dataform.googleapis.com/v1beta1/" + createWorkflowInvocationResult.body.name} auth: type: OAuth2 result: jobStatus
上記処理にてDataformワークフロー実行状態を取得しています。APIとしては以下になります。
リファレンスはこちらです。
Method: projects.locations.repositories.workflowInvocations.get
本APIは、Dataformワークフロー実行結果のname
を元に呼び出すことができます。
createWorkflowInvocation:
ステップにて呼び出し結果をcreateWorkflowInvocationResult
に格納して、その値を用いて以下のようにしてURLを組み立てます。
url: ${"https://dataform.googleapis.com/v1beta1/" + createWorkflowInvocationResult.body.name}
Dataformワークフローの実行結果はリファレンスより以下となるので、ポーリングを終了する状態(state)を選んで設定します。今回はSUCCEEDED
になったらポーリングを終了するようにしました。
state | 概要 |
---|---|
STATE_UNSPECIFIED | 初期値。使用されることはない |
RUNNING | ワークフローが実行中 |
SUCCEEDED | 成功 |
CANCELLED | キャンセル |
FAILED | 失敗 |
CANCELING | キャンセル中 |
注意点としてポーリングする状態、Dataformの実装、Dataformワークフロー実行結果によっては意図せぬ待機時間、実行ステップ数になってしまう可能性があるのでご注意ください。
ワークフロー実行の最長時間(開始時刻から終了時刻まで)。この上限を超えると、ワークフローはタイムアウト エラーで終了します。
引用:リソースの上限
※今回の実装は検証用なので最低限の実装しかしていません。ポーリング最大時間を制限したり、起動後にWorkflowsが停止しない場合はWorkflowsの実行をキャンセルするなどして停止させてください。
それではポーリング処理を見ていきます。
- checkIfDone: switch: - condition: ${checkStatus.body.state == "SUCCEEDED"} return: ${"ポーリングを終了します。 STATE:" + checkStatus.body.state} - printLog: call: sys.log args: text: ${"ポーリングを開始します。STATE:" + checkStatus.body.state} severity: INFO - wait: call: sys.sleep args: seconds: 2 next: checkDataform
switch
ステートメントを用いてDataformワークフローの実行状態がSUCCEEDED
かどうかを判定しています。
SUCCEEDED
以外であれば、後続のwait
処理が実行されて指定秒数(今回は2秒)待機して、再度checkDataform
ステップを実行してDataformワークフローの実行状態を取得します。
wait
処理で用いているsys.sleep
関数はWorkflowsの指定秒数待機してくれる関数です。
動かしてみる
それでは実際にWorkflowsを動かしてみます。
実行に成功していました。"ポーリングを終了します。 STATE:SUCCEEDED"
のメッセージより、DataformワークフローがSUCCEEDED
になったことも確認できました。
Workflowsのログも見てみます。
4回ポーリングをして、Dataformワークフローの状態がRUNNING
となっていることが確認できました。検証成功です。
おわりに
WorkflowsからDataformワークフローのポーリングができてよかったです。今後Dataformワークフローを起動した後、状態を追いたい場合に使っていきたいと思います。ただ、ポーリングする際にはDataform側の状態に注意して意図せぬ待機時間にならないようにWorkflows側での実装に注意する必要があります。この点は注意ポイントだと思います。
この記事がどなたかのお役に立てば幸いです。それではまた。
参考
埋め込み switch ステートメントを使用してステップを実行する
Dataform API
Workflowsリソースの上限
switchステートメントのリファレンス